#!/usr/bin/env python3
# coding=UTF-8
# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
yr api config for user
"""
import dataclasses
import json
from dataclasses import asdict, dataclass, field
from typing import Dict, List, Union, Optional, get_origin, Any
from enum import IntEnum
from yr.affinity import Affinity

_DEFAULT_CONNECTION_NUMS = 100
_DEFAULT_ENABLE_METRICS = False
_DEFAULT_MAX_TASK_INSTANCE_NUM = -1
_DEFAULT_MAX_CONCURRENCY_CREATE_NUM = 100
_DEFAULT_CONCURRENCY = 1
_DEFAULT_RECYCLE_TIME = 2
_DEFAULT_RPC_TIMOUT = 30 * 60
_MAX_INT = 0x7FFFFFFF
_MIN_INT = 0
NPU_RESOURCE_NAME = "NPU"
CPU_RESOURCE_NAME = "CPU"
MEMORY_RESOURCE_NAME = "Memory"


@dataclass
class UserTLSConfig:
    """
    Out cluster for user ssl/tls.
    """
    #: Path to the root certificate file.
    root_cert_path: str
    #: Path to the module certificate file.
    module_cert_path: str
    #: Path to the module key file.
    module_key_path: str
    #: Server name, defaults is ``None``.
    server_name: str = None


@dataclass
class DeploymentConfig:
    """
    Auto Deployment Configuration Class.
    """
    cpu: int = 0
    mem: int = 0
    datamem: int = 0
    spill_path: str = ""
    spill_limit: int = 0


@dataclass
class Config:
    """
    YR API config.
    """
    #: Function id which you deploy, get default by env `YRFUNCID`.
    #: etc. ``sn:cn:yrk:12345678901234561234567890123456:function:0-test-test:$latest``.
    function_id: str = ""
    #: Cpp function id which you deploy, get default by env `YR_CPP_FUNCID`.
    cpp_function_id: str = ""
    #: System cluster address, get default by env `YR_SERVER_ADDRESS`.
    server_address: str = ""
    #: DataSystem address, get default by env `YR_DS_ADDRESS`.
    ds_address: str = ""
    #: Only ``False`` when initialize in runtime, default is ``True``.
    is_driver: bool = True
    #: YR api log level have ``ERROR/WARNING/INFO/DEBUG``, default is ``WARNING``.
    log_level: Union[str, int] = "WARNING"
    #: Http client read timeout(sec), default is ``900``.
    invoke_timeout: int = 900
    #: Run code in local, default is ``False``.
    local_mode: bool = False
    #: Need set which init in runtime.
    code_dir: str = ""
    #: Http client connection nums.
    #: default is ``100``, limit: [1,∞).
    connection_nums: int = _DEFAULT_CONNECTION_NUMS
    #: Instance recycle period(sec).
    #: default is ``2 second``, limit: (0,300].
    recycle_time: int = _DEFAULT_RECYCLE_TIME
    #: If ``True`` will use DataSystem in cluster client, default: ``False``.
    in_cluster: bool = False
    #: Auto generated by init.
    job_id: str = ""
    #: For out cluster https ssl.
    tls_config: UserTLSConfig = None
    #: Auto start distribute-executor when `yr.init`, and auto stop distribute-executor when `yr.finalize`.
    #: default is ``False``.
    auto: bool = False
    #: When `auto=True` needed, use to define deployment detail.
    deployment_config: "DeploymentConfig" = None
    #: Runtime server, keep default in driver.
    rt_server_address: str = ""
    #: Log directory, specifies the path where log files will be stored.
    #: Default is the current working directory ("./").
    log_dir: str = "./"
    #: Max size for log file, default is ``0`` (If the default value is ``0``, it will eventually be set to ``40``).
    log_file_size_max: int = 0
    #: Max number for log file, default is ``0`` (If the default value is ``0``, it will eventually be set to ``20``).
    log_file_num_max: int = 0
    #: Interval for log flush, default is ``5``.
    log_flush_interval: int = 5
    #: Runtime id, keep default in driver.
    runtime_id: str = "driver"
    #: The maximum number of instances of stateless function.
    max_task_instance_num: int = _DEFAULT_MAX_TASK_INSTANCE_NUM
    #: Code loading path.
    load_paths: list = field(default_factory=list)
    #: The timeout used for RPC.
    rpc_timeout: bool = _DEFAULT_RPC_TIMOUT
    #: Whether to enable client two-way authentication, default is ``False``.
    enable_mtls: bool = False
    #: Client private key file path.
    private_key_path: str = ""
    #: Client certificate file path.
    certificate_file_path: str = ""
    #: Server certificate file path.
    verify_file_path: str = ""
    #: Server name, used to identify and connect to a specific server instance.
    server_name: str = ""
    #: Namespace, used to organize and isolate configurations or resources.
    ns: str = ""
    #: Whether to enable metric collection. ``False`` indicates disabled, and ``True`` indicates enabled.
    #: The default value is ``False``. This takes effect only when called in the cluster.
    enable_metrics: bool = False
    #: Used to set custom environment variables for the runtime. Currently, only `LD_LIBRARY_PATH` is supported.
    custom_envs: Dict[str, str] = field(default_factory=dict)
    #: Function master address list.
    master_addr_list: list = field(default_factory=list)
    #: Specify the user code or the local path location that it depends on, absolute path, and ensure that
    #: it exists on all nodes in the cluster. The default value is empty.
    working_dir: str = ""
    #: Whether to enable data system TLS authentication.
    #: If ``True``, enable data system tls authentication, else not.
    enable_ds_encrypt: bool = False
    #: The path of worker public key for data system tls authentication,
    #: if enable_ds_encrypt is true and the ds_public_key_path is empty, an exception will be thrown.
    ds_public_key_path: str = ""
    #: The path of client public key for data system tls authentication.
    #: if enable_ds_encrypt is true and the runtime_public_key_path is empty, an exception will be thrown.
    runtime_public_key_path: str = ""
    #: The path of client private key for data system tls authentication.
    #: if enable_ds_encrypt is true and the runtime_private_key_path is empty, an exception will be thrown.
    runtime_private_key_path: str = ""
    num_cpus: Optional[int] = None
    runtime_env: Optional[Dict[str, Any]] = None


@dataclass
class ClientInfo:
    """
    Use to store yr client info.
    """
    #: Automatically generated when `yr.init` is called, a unique identifier for a task.
    job_id: str


@dataclass
class Device:
    """
    Use to init xpu task
    """
    name: str = ""
    batch_size: int = 1


class SchedulingAffinityType(IntEnum):
    """
    Bundle affinity type.

    **Please use ONLY REQUIRED_AFFINITY_IN_EACH_BUNDLE.**

    **All other attributes are inherited from IntEnum and should not be used directly.**
    """
    #: Currently, only REQUIRED_AFFINITY_IN_EACH_BUNDLE is supported,
    #: indicating strong affinity within each bundle.
    REQUIRED_AFFINITY_IN_EACH_BUNDLE = 0


@dataclass
class ResourceGroupOptions:
    """
    Resource group options.
    """
    #: The name of the ResourceGroup that needs to be scheduled. By default, it is empty.
    #: indicating that it is not scheduled to any ResourceGroup.
    #: If it is not empty, it is scheduled to the specified ResourceGroup.
    resource_group_name: str = ""
    #: The index of the bundle to be scheduled takes effect only if resource_group_name is not empty.
    #: The value range is [-1, the number of bundles in the ResourceGroup).
    #: default value is ``-1``, indicating that no specific bundle is specified;
    #: if it is a value other than ``-1`` within the value range,
    #: it indicates that the bundle is scheduled to the corresponding index of the ResourceGroup;
    #: if it is any other value, an error will be generated.
    bundle_index: int = -1


@dataclass
class FunctionGroupOptions:
    """
    Function group options.
    """
    #: Required CPU size in millicores (m), limited to the range [300, 16000].
    cpu: Optional[int] = None
    #: Required memory size in MB, limited to the range [128, 65536].
    memory: Optional[int] = None
    #: Custom resources, currently supports "NPU/XX/YY", where XX is the card model such as Ascend910B4,
    #: and YY can be ``count``, ``latency``, or ``stream``.
    resources: Dict[str, float] = field(default_factory=dict)
    #: The affinity type of instances within the bundle, Default is ``None``.
    scheduling_affinity_type: Optional[SchedulingAffinityType] = None
    #: The number of function instances within each bundle.
    #: Timeout period, measured in seconds.  Restriction: ``-1, [0, 0x7FFFFFFF]``,
    #: The default value is ``-1``, indicating blocking, etc.
    scheduling_affinity_each_bundle_size: Optional[int] = None
    #: Timeout in seconds, valid values are ``-1`` or within ``[0, 0x7FFFFFFF]``. Default is ``-1``,
    #: meaning blocking wait.
    timeout: Optional[int] = None
    #: Instance concurrency, limited to the range [1, 1000].
    concurrency: Optional[int] = None
    #: Number of recovery retry attempts, used when instance recovery fails.Default is ``0``.
    recover_retry_times: int = 0


def function_group_enabled(opts: FunctionGroupOptions, group_size: int) -> bool:
    """
    if function group enabled.
    """
    if opts.scheduling_affinity_each_bundle_size is None:
        return False
    if opts.scheduling_affinity_each_bundle_size > 0 and (
            opts.scheduling_affinity_each_bundle_size <= group_size):
        return True
    if group_size == 0 and opts.scheduling_affinity_each_bundle_size == 0:
        return False
    raise RuntimeError("enable function group failed, group_size: {}, bundle_size: {}".format(
        group_size,
        opts.scheduling_affinity_each_bundle_size))


@dataclass
class DeviceInfo:
    #: 处理器 HDC 通道号
    device_id: int = 0
    #: 处理器真实网卡 IP
    device_ip: str = ""
    #: rank 的标识，rank id 从 0 开始
    rank_id: int = 0


@dataclass
class ServerInfo:
    #: 挂载到本函数实例的设备信息
    devices: List[DeviceInfo] = field(default_factory=list)
    #: 设备所在的节点 id
    server_id: str = ""


@dataclass
class FunctionGroupContext:
    """
    A context class for managing function group information.
    """
    #: The ID of this function instance within the function group.
    #: Range: [0, world_size - 1]
    #: Default value: ``0``.
    rank_id: int = 0

    #: Total number of function instances in the group.
    #: Default value: ``0``.
    world_size: int = 0

    #: Server info list for inter-instance communication.
    #: Default: empty list.
    server_list: List[ServerInfo] = field(default_factory=list)

    #: Name of the device used by this function instance, e.g., NPU/Ascend910B.
    #: Default: empty string.
    device_name: str = ""


@dataclass
class InvokeOptions:
    """Use to set the invoke options.

    Examples:
        >>> import yr
        >>> import time
        >>> yr.init()
        >>> opt = yr.InvokeOptions()
        >>> opt.pod_labels["k1"] = "v1"
        >>> @yr.invoke(invoke_options=opt)
        ... def func():
        ...     time.sleep(100)
        >>> ret = func.invoke()
        >>> yr.get(ret)
        >>> yr.finalize()
    """
    #: The size of the CPU required. Value Range is [300, 16000] and unit is m (milli-core).
    cpu: int = 500
    #: The size of memory required. Unit: MB. Range: [128, 1073741824].
    memory: int = 500
    #: Instance concurrency. Value Range is [1, 1000]. Priority is higher than
    #: the "Concurrency" configured in custom_extensions. It is recommended to use this parameter for configuration.
    concurrency: int = 1
    #: Custom resources currently support "GPU/XX/YY" and "NPU/XX/YY", where XX is the card model such as Ascend910B4,
    #: and YY can be count, latency, or stream.
    custom_resources: Dict[str, float] = field(default_factory=dict)
    custom_extensions: Dict[str, str] = field(default_factory=dict)
    """
    Specify user-defined configurations, such as function concurrency. 
    It can also be used as a user-defined tag for metrics to collect user information. 

    .. list-table:: Common `custom_extensions` configuration 

        * - "Concurrency"
          - Concurrency. Range: [1,1000]. 
        * - "lifecycle"
          - detached, supports detached mode. 
        * - "DELEGATE_DIRECTORY_INFO"
          - Custom directories support the ability to create and delete subdirectories. When an instance is created, 
            if the user-defined directory exists and has read and write permissions, a subdirectory is created under 
            it as the working directory; otherwise, a subdirectory is created under the `/tmp` directory as the working 
            directory. When the instance is destroyed, the working directory is destroyed. The user function can 
            obtain the working directory through the `INSTANCE_WORK_DIR` environment variable.
        * - "DELEGATE_DIRECTORY_QUOTA"
          - Subdirectory quota size, value range is greater than ``0 M`` and less than ``1 TB``. If this configuration 
            is not set, the default is ``512 M``. If the configuration is ``-1``, monitoring is not performed. Unit: MB.
        * - "GRACEFUL_SHUTDOWN_TIME"
          - Customize the graceful exit time, in seconds. Limit: ``>=0``, ``0`` means immediate exit, and does not 
            guarantee that the user's graceful exit function can be completed; if configured <0, the system 
            configuration at deployment time is used as the timeout time.
        * - "RECOVER_RETRY_TIMEOUT"
          - Customize the recover timeout time. The instance recover timeout time is in milliseconds. Limit: ``>0``, 
            Default to ``10 * 60 * 1000``

    When used as a user-defined tag for metrics: 
    
    >>> import yr
    >>> yr.init()
    >>> opt = yr.InvokeOptions()
    >>> opt.custom_extensions["YR_Metrics"] = "{\"endpoint\":\"127.0.0.1\", \"project_id\":\"my_project_id\"}"

    In Prometheus, select `metrics name` as `yr_app_instance_billing_invoke_latency`, and you can find the custom tag 
    information in the collected invoke information:

    .. code-block:: text

        yr_app_instance_billing_invoke_latency{
        ...
        endpoint="127.0.0.1",
        ...}
    """

    pod_labels: Dict[str, str] = field(default_factory=dict)
    """
    Pod labels only used in Kubernetes environment. When creating a function instance, pod_labels can accept key-value 
    pairs from the user and pass them to the function system. 

    * After the ActorPattern function instance specialization is completed (Running), 
      the Scaler applies the incoming labels to the POD. 
    * When an ActorPattern function instance fails or is deleted, 
      the Scaler sets the corresponding label of the POD to empty (Remove it); 
    * Constraints: 
       * The number of labels that can be stored in `pod_labels` cannot exceed 5.
       * Constraints on the key and value in `pod_labels`: 
          * key：Supports uppercase and lowercase letters, numbers, and hyphens, and allows a length of 1-63. 
              Does not start or end with a hyphen. Empty strings are not allowed. 
          * value：Supports uppercase and lowercase letters, numbers, and hyphens, with a length of 1-63. 
              Does not start or end with a hyphen. Allows empty strings.
    * Raises：
       When the `pod_labels` passed by the user does not meet the constraints, 
       the corresponding exception and error message will be thrown. 
    """

    #: Labels of instance
    labels: List[str] = field(default_factory=list)
    #: Specify the time when the invoke call of the desired heterogeneous function is completed.
    max_invoke_latency: int = 5000
    #: Specify the minimum number of instances for a stateless function.
    min_instances: int = 0
    #: Specify the maximum number of instances for a stateless function.
    max_instances: int = 0
    #: The number of instance recovery times (when an instance abnormally exits, the instance is automatically restored
    #: to the latest state). If the value is ``0``, the instance is not automatically restored when it abnormally exits.
    recover_retry_times: int = 0
    #: Whether to enable order-preserving. Only effective for stateful functions.
    need_order: bool = False
    #: Used to specify the name of the instance. When `namespace` is specified, the instance name is `namespace-name`,
    #: otherwise it is `name`.
    name: str = ""
    #: Used to specify the namespace of the instance.
    namespace: str = ""

    #: Set affinity condition list.
    schedule_affinities: List[Affinity] = field(default_factory=dict)
    #: Set whether to enable weak affinity priority scheduling. If enabled, when multiple weak affinity conditions are
    #: passed, match and score them in order. Scheduling is successful as soon as one condition is met.
    preferred_priority = True
    #: Set whether to enable strong affinity priority scheduling. If enabled, when multiple strong affinity conditions
    #: are passed, they are matched and scored in order. If none of the strong affinity conditions are met,
    #: the scheduling fails.
    required_priority = False
    #: Whether to enable anti-affinity for non-selectable resources. If enabled, scheduling fails when none of the
    #: weak affinity conditions are met. When preferred_anti_other_labels is set to True, if no PODs that meet the
    #: conditions are found for weak affinity/anti-affinity, scheduling fails and no other resources' PODs are
    #: selected for scheduling.
    preferred_anti_other_labels = True

    resource_group_options: ResourceGroupOptions = field(default_factory=ResourceGroupOptions)
    """
    Specify the ResourceGroup option, which includes resource_group_name and bundle_index. 
    
    When creating a function instance: If `resource_group_name` is set, it will be passed to the kernel to schedule to 
    the specified ResourceGroup. If both `resource_group_name` and `bundle_index` are set, they are passed to the 
    kernel to schedule the bundle to the specified ResourceGroup and index. The default value of `resource_group_name` 
    is empty, and the default value of `bundle_index` is ``-1``. 

    * Constraints: 
       * When `resource_group_name` is empty, the instance will not be scheduled to the specified ResourceGroup, 
         and the bundle_index field is not effective. 
       * When `resource_group_name` is not empty:
          * When `bundle_index` is ``-1``, the instance is scheduled to the specified ResourceGroup. 
          * When ``0<= bundle_index < number of bundles in ResourceGroup``，schedule the instance to a specified bundle
            in a specified ResourceGroup.
          * When ``bundle_index < -1`` or ``bundle_index >= number of bundles in ResourceGroup``，raise error.
    * Raises：
       * There is no ResourceGroup with the `resource_group_name` provided by the user. 
       * The user passes a non-empty `resource_group_name` and ``bundle_index < -1``.
       * When the user passes a non-empty `resource_group_name` and `bundle_index` >= the number of 
         bundles in the ResourceGroup
       * Scheduling failed: For example, the specified ResourceGroup or the specified bundle of the specified 
         ResourceGroup does not have enough resources to handle instance scheduling. 
    """

    #: Function group options.
    function_group_options: FunctionGroupOptions = field(default_factory=FunctionGroupOptions)
    #: Set environment variables when the instance starts.
    env_vars: Dict[str, str] = field(default_factory=dict)
    #: Number of retries for stateless functions.
    retry_times: int = 0
    #: Set the traceId for function calls for link tracing.
    trace_id: str = ""
    #: In the scenario where a function is invoked by a specified alias in cross-function invocation, when the
    #: alias is a rule alias, this parameter is used to set the kv parameter that the rule alias depends on.
    alias_params: Dict[str, str] = field(default_factory=dict)

    runtime_env: Dict = field(default_factory=dict)
    """
    Configure the stateful/stateless function runtime environment with `conda`, `pip`, `working_dir`, and `env_vars`.
    
    * `conda` provides different Python runtime environments for stateful function. 
       * Specify an existing conda environment (the environment exists on all nodes)
         ``runtime_env = {"conda":"pytorch_p39"}``
       * Create and use conda environments through configuration.
         ``runtime_env["conda"] = {"name":"myenv","channels": ["conda-forge"], "dependencies": ["python=3.9",
         "msgpack-python=1.0.5", "protobuf", "libgcc-ng", "cloudpickle=2.0.0", "cython=3.0.10", "pyyaml=6.0.2"]}``
       * Create and use a conda environment through a YAML file (the YAML file meets the conda requirements).
         ``runtime_env = {"conda":"/home/env.yaml"}``
    * `pip` installs dependencies for Python runtime environment.
    * `working_dir` configure the code path of the job.
    * `env_vars` configure process-level environment variables.
      ``runtime_env = {"env_vars":{"OMP_NUM_THREADS": "32", "TF_WARNINGS": "none"}}``
    * Constraints of `runtime_env`:
       * The keys supported by runtime_env are `conda`, `env_vars`, `pip`, `working_dir`. 
         Other keys will not take effect and will not cause errors. 
       * Run the yr function with conda. The environment needs to have yr and its third-party dependencies. It is 
         recommended that users first create a conda environment and then specify it with `runtime_env`, for example:
         ``runtime_env = {"conda":"pytorch_p39"}``
       * `runtime_env` supports creating and switching conda environments using configurations. The configuration needs
         to install third-party dependencies for yr, for example: 
         ``runtime_env["conda"] = {"name":"myenv","channels": ["conda-forge"], "dependencies": ["python=3.9",
         "msgpack-python=1.0.5", "protobuf", "libgcc-ng", "cloudpickle=2.0.0", "cython=3.0.10", "pyyaml=6.0.2"]}``
       * The environment created using conda in `runtime_env` needs to be cleaned up by the user. 
       * In `runtime_env`, conda can use `pip` to install dependencies, which are managed directly by conda. 
         ``runtime_env = {"conda":{'name': 'my_project_env', 'channels': ['defaults', 'conda-forge'], 
         'dependencies': ['python=3.9', {'pip': ['requests==2.25.1']}]}}``
       * Currently, Python 3.9 and Python 3.11 SDKs are available. The Python version of conda needs to 
         be consistent with the SDK version. 
       * If both `InvokeOptions.env_vars` and `InvokeOptions.runtime_env["env_vars"]` are configured with the same key,
         the configuration in `InvokeOptions.env_vars` will be used. 
       * If `InvokeOptions.runtime_env["working_dir"]` is configured, use this configuration, 
         otherwise, use `YR.Config.working_dir` and finally use the configuration in `InvokeOptions.env_vars`.
       * If you use conda, you need to specify the environment variable `YR_CONDA_HOME` to point to installation path. 
    """

    def check_options_valid(self):
        """
        Check whether the options are valid.

        Raises:
            TypeError: If options are invalid, throw this exception.
        """

        attributes_to_check = [
            ("env_vars", Dict[str, str]),
            ("name", str),
            ("namespace", str),
            ("preferred_anti_other_labels", bool),
            ("preferred_priority", bool),
            ("trace_id", str),
            ("custom_resources", Dict[str, float]),
            ("custom_extensions", Dict[str, str]),
        ]
        for actual, expected_type in attributes_to_check:
            actual_value = getattr(self, actual)
            if actual_value is None:
                continue
            origin = get_origin(expected_type)
            check_type = origin if origin is not None else expected_type
            if not isinstance(actual_value, check_type):
                raise TypeError(
                    f"invalid type for '{actual}', actual: {type(actual_value)}, expect: {expected_type}"
                )


    def check_options_range(self):
        """
        Check whether the options are in the valid range.
        """
        attrs = [
            "retry_times",
            "recover_retry_times",
            "max_instances",
            "max_invoke_latency",
            "min_instances",
       ]

        for attr in attrs:
            value = getattr(self, attr)
            if attr in ["max_instances", "max_invoke_latency"] and value == -1:
                continue
            if not _MIN_INT <= value <= _MAX_INT:
                raise ValueError(
                    f"{attr} 超过范围, 请输入 {_MIN_INT} 到 {_MAX_INT} 范围的值"
                )


def dataclass_from_dict(klass, d):
    """
    parse dataclass from dict
    """
    try:
        field_types = {f.name: f.type for f in dataclasses.fields(klass)}
        return klass(**{f: dataclass_from_dict(field_types[f], d[f]) for f in d})
    except Exception:
        return d  # Not a dataclass field


@dataclass
class MetaFunctionID:
    """
    meta function id
    """
    cpp: str = ""
    python: str = ""
    java: str = ""


@dataclass
class MetaConfig:
    """
    TaskMetadata is used to convey control information
    {
        "jobID": "xxx",
        "codePath": "xx",
        "recycleTime": 2,
        "maxTaskInstanceNum": -1,
        "maxConcurrencyCreateNum": 100,
        "enableMetrics": false,
        "threadPoolSize": 10,
        "functionID": {
            "cpp": "xxx",
            "python": "xxx",
            "java": "xxx"
        }
    }
    """
    jobID: str
    codePath: str
    recycleTime: int = _DEFAULT_RECYCLE_TIME
    maxTaskInstanceNum: int = _DEFAULT_MAX_TASK_INSTANCE_NUM
    maxConcurrencyCreateNum: int = _DEFAULT_MAX_CONCURRENCY_CREATE_NUM
    enableMetrics: bool = _DEFAULT_ENABLE_METRICS
    threadPoolSize: int = 10
    functionID: MetaFunctionID = field(default_factory=MetaFunctionID)
    functionMasters: list = field(default_factory=list)

    @classmethod
    def parse(cls, data):
        """
        parse TaskMetadata from json or dict

        Returns: TaskMetadata object
        """
        self = dataclass_from_dict(MetaConfig, json.loads(data))
        return self

    def to_json(self):
        """
        convert to json
        Returns: json
        """
        return json.dumps(asdict(self))
